package cn.doitedu.dw_etl.utils

import java.util

import org.apache.commons.lang3.StringUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{DataTypes, StructType}

/**
 * 生成id映射（设备=>GUID）字典
 */
object IdMappingGen {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val spark = SparkSession.builder().appName("IDMAPPING映射字典生成").master("local[*]").getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._

    // 加载T日的日志
    val schema = new StructType()
      .add("deviceid", DataTypes.StringType)
      .add("account", DataTypes.StringType)
      .add("timestamp", DataTypes.LongType)
    val t_log = spark.read.option("header", true).schema(schema).csv("dw_etl\\data\\idmp_testdata\\T_day_log")
    t_log.show(100, false)

    // 1. 对当天日志中的访问记录进行账号评分

    // 1.1 过滤掉完全没有账号登录的设备记录
    val ifBlank = (s: String) => {
      StringUtils.isBlank(s)
    }
    spark.udf.register("isBlank", ifBlank)

    // 有登录记录的设备
    val haveAccount = t_log.where("!isBlank(account)")
    t_log.cache()
    haveAccount.show(100, false)

    // 没有账号登录记录的设备
    val noAccount = t_log.groupBy("deviceid").agg(max("account") as "account", min("timestamp") as "ts").where("account is null")

    // 1.2 对同一个设备上的同一个账号，取时间最早的第一条

    val firstRecord = haveAccount.groupBy("deviceid", "account").agg(min("timestamp") as "ts")

    firstRecord.show(100, false)

    // 1.3 对每个设备上的每个登录账号评分（越早分越高）
    val window = Window.partitionBy("deviceid").orderBy("ts")
    val scored = firstRecord.select('deviceid, 'account, 'ts, row_number() over (window) as "rn")
      .selectExpr("deviceid", "account", "ts", "100-(rn-1)*10 as score")

    scored.show(100, false)

    // 1.4 将账号评分结果 union 无账号的数据
    val t_result = scored.union(noAccount.selectExpr("deviceid", "account", "ts", "null"))

    t_result.show(100, false)

    // 2. 加载T-1日的映射字典
    // 2.1 读取数据
    val preIdmp = spark.read.json("dw_etl\\data\\idmp_testdata\\T-1dict_out")
    preIdmp.printSchema()
    preIdmp.show(100, false)
    preIdmp.createTempView("preidmp")
    /**
     * +--------+-----+-----------------------------+
     * |deviceid|guid |uid_list                     |
     * +--------+-----+-----------------------------+
     * |did01   |u01  |[[u01, 100, 1]]              |
     * |did02   |u02  |[[u02, 100, 3], [u03, 90, 8]]|
     * |did03   |did03|[]                           |
     * +--------+-----+-----------------------------+
     */
    // 2.2 格式扁平化
    val preUidScore = spark.sql(
      """
        |
        |select
        |deviceid,
        |uid_score.account as account,
        |uid_score.timestamp as ts,
        |uid_score.score as score
        |
        |from preidmp lateral view explode(uid_list) tmp as uid_score
        |
        |""".stripMargin)

    preUidScore.show(100, false)

    // 2.3 将历史idmp数据中，账号绑定列表为空的数据单独拎出来
    val preNoUid = preIdmp.where("size(uid_list)=0").selectExpr("deviceid", "null", "null", "null")

    // 3.合并T和T-1

    // 3.1 分数聚合
    val wholeDevices = t_result.union(preUidScore).union(preNoUid)
    wholeDevices.show(100, false)
    wholeDevices.createTempView("whole")

    val todayUidScoreResult = spark.sql(
      """
        |select
        |deviceid,
        |account,
        |ts,
        |score
        |from
        |(
        |select
        |
        |deviceid,
        |account,
        |ts,
        |score,
        |row_number() over(partition by deviceid order by account desc) as rn
        |
        |from
        |(
        |select
        |
        |deviceid,
        |account,
        |min(ts) as ts,
        |sum(score) as score
        |
        |from whole
        |group by deviceid,account
        |) o1
        |) o2
        |where !(rn>1 and account is null)
        |
        |""".stripMargin)

    todayUidScoreResult.show(100,false)

    /**
     * +--------+-------+---+-----+
     * |deviceid|account|ts |score|
     * +--------+-------+---+-----+
     * |did04   |null   |16 |null |  did04
     * |did03   |u04    |12 |100  |  u04
     * |did02   |u02    |3  |190  |  u02
     * |did02   |u03    |8  |190  |  u02
     * |did01   |u01    |1  |200  |  u01
     * +--------+-------+---+-----+
     */

    // 3.2 将同一个设备的账号评分整理到一起，选出guid，并转成json格式
    todayUidScoreResult.createTempView("tod")
    val finalResult = spark.sql(
      """
        |select
        |   deviceid,
        |   collect_list(if(account is not null,struct(account,timestamp,score),null)) as uid_list,
        |   max(guid) as guid
        |from
        |  (
        |     select
        |       deviceid,
        |       account,
        |       ts as timestamp,
        |       score,
        |       first_value(if(account is not null,account,deviceid)) over(partition by deviceid order by score desc,ts) as guid
        |     from
        |       tod
        |  ) o
        |group by deviceid
        |
        |""".stripMargin)
    finalResult.show(100,false)


    // 保存结果
    finalResult.coalesce(1).write.json("dw_etl/data/idmp_testdata/T_day_dict_out")


    spark.close()
  }
}
